package com.atguigu.gmall.mq.receiver;

import com.atguigu.gmall.mq.config.DeadLetterMqConfig;
import com.atguigu.gmall.mq.config.DelayedMqConfig;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @author: atguigu
 * @create: 2023-06-27 10:30
 */
@Slf4j
@Component
public class DelayMsgReceiver {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 由于通过注册Bean方式创建相关交换机队列,故只需要指定监听队列名称即可
     * 监听延迟消息
     *
     * @param msg
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(queues = DelayedMqConfig.queue_delay_1)
    public void processDelayMsg(String msg, Channel channel, Message message) {
        if (StringUtils.isNotBlank(msg)) {
            //保证消息幂等性 采用Redis set  nx 命令
            String key = "mq:" + msg;//业务唯一标识
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "", 5, TimeUnit.MINUTES);
            if (!flag) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            //说明第一次处理业务
            log.info("[mqdemo]监听到了延迟消息,处理业务:{}", msg);
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
